<?php

namespace App\Services;

use App\Common\Libs\RmqClient;
use App\Consts\GlobalConst;
use App\Consts\MessageConst;
use Illuminate\Support\Facades\Artisan;
use Illuminate\Support\Facades\Log;

/**
 * RabbitMQ 出入队列逻辑
 * Class RabbitMQService
 * @package App\Services
 */
class RabbitMQService extends BaseService
{

    /**
     * 获取格式化主体数据
     * @param $type
     * @param $callback
     * @param $data
     * @param int $taskId
     * @param int $messageId
     * @return array
     */
    public function getFormatBody($type, $callback, $data, $taskId = 0, $messageId = 0)
    {
        return [
            'type' => $type,
            'callback' => $callback,
            'timestamp' => time(),
            'task_id' => $taskId,
            'message_id' => $messageId,
            'data' => $data,
        ];
    }

    /**
     * 消息入队列入口
     * @param $body
     * @param string $queue
     * @param string $exchange
     * @return bool
     */
    public function publisher($body, $queue = GlobalConst::RMQ_QUEUE_MSG_CENTER, $exchange = GlobalConst::RMQ_EXCHANGE_MSG_CENTER)
    {
        $taskId = !empty($body['task_id']) ? $body['task_id'] : 0;
        $messageId = !empty($body['message_id']) ? $body['message_id'] : 0;
        $info = "[RMQ][API][PUBLISHER]" . " [{$taskId}][{$messageId}] Publish - " . json_encode([
            'body' => $body,
            'queue' => $queue,
            'exchange' => $exchange,
        ]) . PHP_EOL;
        Log::info($info);
        try {
            $rabbitMQ = RmqClient::getInstance();
            $res = $rabbitMQ->publisher($exchange, $queue, $body);
            if($res['code'] != 0){
                return false;
            }
            return true;
        } catch (\Exception $e) {
            // 更新推送状态
            $remark = '推送异常： [Publisher] - ' . $e->getMessage();
            $this->MsgSendService->updateStatus($taskId, MessageConst::TASK_STATUS_FAILED, $remark);
            return false;
        }
    }

    /**
     * 消息出队列回调
     * @param \AMQPEnvelope $envelope
     * @param \AMQPQueue $queue
     * @return bool
     */
    public function consumer(\AMQPEnvelope $envelope, \AMQPQueue $queue)
    {
        $res = false;
        $data = json_decode($envelope->getBody(), true);
        $taskId = !empty($data['task_id']) ? $data['task_id'] : 0;
        $messageId = !empty($data['message_id']) ? $data['message_id'] : 0;
        try {
            $info = "[RMQ][CALLBACK][CONSUMER]" . " [{$taskId}][{$messageId}] Consumer - " . json_encode($data) . PHP_EOL;
            Log::info($info);
            // heartbeat message
            if (isset($data['heartbeat']) && isset($data['time'])) {
                $queue->ack($envelope->getDeliveryTag());
                $info = "[RMQ][CALLBACK][CONSUMER]" . " [{$taskId}][{$messageId}] Consumer - shutdown by heartbeat." . PHP_EOL;
                Log::info($info);
                return true;
            }
            // 执行服务回调
            if (!empty($data['callback']) && !empty($data['type'])) {
                $method = $data['callback'];
                if (GlobalConst::RMQ_QUEUE_TYPE_DINGTALK == $data['type']) {
                    // 钉钉回调
                    $res = service()->DingService->$method($data);
                } elseif (GlobalConst::RMQ_QUEUE_TYPE_MSG == $data['type']) {
                    // 短信回调
                    $res = service()->SmsService->$method($data);
                } else {
                    // 其他回调
                    // ...
                    $res = true;
                }
            }
            // 记录成功结果
            $info = "[RMQ][CALLBACK][RES]" . " [{$taskId}][{$messageId}] Consumer - " . json_encode($res) . PHP_EOL;
            Log::info($info);
            //显式确认，队列收到消费者显式确认后，会删除该消息
            $queue->ack($envelope->getDeliveryTag());
            //$queue->delete(AMQP_DURABLE);
        } catch (\Exception $e) {
            $flag = GlobalConst::ERROR_FILE_FLAG;
            $info = "[RMQ][CALLBACK][CATCH] " . json_encode([
                    'msg' => $e->getMessage(),
                    'code' => $e->getCode(),
                    'status' => 'failed',
                    'result' => [
                        'file' => $flag . ' : ' . $e->getFile() . ' - ' . $e->getLine(),
                        'data' => [
                            'envelope' => $envelope,
                            'queue' => $queue,
                        ],
                    ]
                ]);
            Log::error($info);
            // 更新推送状态
            $remark = '推送异常： [Consumer] - ' . $e->getMessage();
            $this->MsgSendService->updateStatus($taskId, MessageConst::TASK_STATUS_FAILED, $remark);
            // @todo 增加重试支持
            return false;
        }
        return $res;
    }

    /**
     * start mq
     * @return mixed
     */
    public static function startMq()
    {
        $res = Artisan::call('rabbitmq:work', ['--action' => 'start']);
        return $res;
    }

    /**
     * stop mq
     * @return mixed
     */
    public static function stopMq()
    {
        $res = Artisan::call('rabbitmq:work', ['--action' => 'stop']);
        return $res;
    }

    /**
     * reload mq
     * @return mixed
     */
    public static function reloadMq()
    {
        $res = Artisan::call('rabbitmq:work', ['--action' => 'reload']);
        return $res;
    }

}